package cn.doitedu.rtdw.rt_report;

import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/2/9
 * @Desc: 学大数据，到多易教育
 * 实时大屏报表： 各小时各品牌 销售额最高的 前10个商品
 **/
public class BrandTop10Product_JOIN {
    public static void main(String[] args) {
        // 编程环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/ckpt");
        env.setParallelism(1);

        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        // 创建逻辑映射表，读取 业务库中的  订单主表
        tenv.executeSql(
                "CREATE TABLE order_source (                         " +
                        "      id BIGINT,                                     " +
                        "      create_time timestamp(3),                      " +
                        "     PRIMARY KEY (id) NOT ENFORCED            " +
                        "     ) WITH (                                 " +
                        "     'connector' = 'mysql-cdc',               " +
                        "     'hostname' = 'doitedu'   ,               " +
                        "     'port' = '3306'          ,               " +
                        "     'username' = 'root'      ,               " +
                        "     'password' = 'root'      ,               " +
                        "     'database-name' = 'realtimedw',          " +
                        "     'table-name' = 'oms_order'          " +
                        ")");

        // 创建逻辑映射表 ，读取 业务库中的  订单详情表
        tenv.executeSql(
                "CREATE TABLE order_item_source (    " +
                        "      id BIGINT,                                   " +
                        "      order_id BIGINT,                             " +
                        "      product_id BIGINT,                           " +
                        "      product_name STRING,                         " +
                        "      product_brand STRING,                        " +
                        "      product_quantity INT,                        " +
                        "      product_price DECIMAL,                       " +
                        "     PRIMARY KEY (id) NOT ENFORCED            " +
                        "     ) WITH (                                 " +
                        "     'connector' = 'mysql-cdc',               " +
                        "     'hostname' = 'doitedu'   ,               " +
                        "     'port' = '3306'          ,               " +
                        "     'username' = 'root'      ,               " +
                        "     'password' = 'root'      ,               " +
                        "     'database-name' = 'realtimedw',          " +
                        "     'table-name' = 'oms_order_item'          " +
                        ")");

        // 创建逻辑映射表，用于输出数据到mysql的结果报表
        tenv.executeSql(
                " CREATE TABLE band_top_pd_sink (                   "
                        + "   window_start   timestamp(3),                    "
                        + "   window_end     timestamp(3),                    "
                        + "   brand_name     STRING,                          "
                        + "   product_name   STRING,                          "
                        + "   sale_amt       DECIMAL,                         "
                        + "   `rank`           BIGINT                           "
                        + " ) WITH (                                          "
                        + "    'connector' = 'jdbc',                          "
                        + "    'url' = 'jdbc:mysql://doitedu:3306/rtrpt',     "
                        + "    'table-name' = 'rtrpt_brand_topn_item',        "
                        + "    'username' = 'root',                           "
                        + "    'password' = 'root'                            "
                        + " )                                                 "
        );


        // 创建一个kafka的临时表，用于存储 订单+item表 关联结果
        tenv.executeSql(
                " CREATE TABLE tmp_kafka(          "
                        +"   id    BIGINT PRIMARY KEY NOT ENFORCED     "
                        +"  ,order_id       BIGINT             "
                        +"  ,product_id    BIGINT              "
                        +"  ,product_name  STRING       "
                        +"  ,product_brand STRING       "
                        +"  ,product_quantity  INT       "
                        +"  ,product_price     DECIMAL         "
                        +"  ,order_time  timestamp(3) "
                        + " ) WITH (                                             "
                        + "  'connector' = 'upsert-kafka',                      "
                        + "  'topic' = 'order-item-tmp',                       "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092',    "
                        + "  'properties.group.id' = 'testGroup',                "
                        + "  'key.format'='json',                              "
                        + "  'value.format'='json',                              "
                        + "  'value.json.fail-on-missing-field'='false',         "
                        + "  'value.fields-include' = 'EXCEPT_KEY')              ");


        // 执行  统计逻辑（关联  + 分组topn）
        tenv.executeSql(
                " INSERT INTO tmp_kafka          "
                        +" SELECT                         "
                        +"  i.id                    "
                        +"  ,i.order_id                    "
                        +"  ,i.product_id                 "
                        +"  ,i.product_name               "
                        +"  ,i.product_brand              "
                        +"  ,i.product_quantity           "
                        +"  ,i.product_price              "
                        +"  ,o.create_time as order_time  "
                        +" FROM order_item_source i       "
                        +" JOIN order_source o            "
                        +" ON i.order_id = o.id           "
        );

        //tenv.executeSql("desc tmp").print();
        //tenv.executeSql().print();


        // 输出结果到大屏系统所连接的mysql中的目标表

    }

}
